package datahub

import (
	"bytes"
	"encoding/json"
	"fmt"
	"reflect"
	"strconv"
)

type AuthMode string

const (
	AK  AuthMode = "ak"
	STS AuthMode = "sts"
)

type ConnectorType string

const (
	SinkOdps     ConnectorType = "sink_odps"
	SinkOss      ConnectorType = "sink_oss"
	SinkEs       ConnectorType = "sink_es"
	SinkAds      ConnectorType = "sink_ads"
	SinkMysql    ConnectorType = "sink_mysql"
	SinkFc       ConnectorType = "sink_fc"
	SinkOts      ConnectorType = "sink_ots"
	SinkDatahub  ConnectorType = "sink_datahub"
	SinkHologres ConnectorType = "sink_hologres"
)

func (ct *ConnectorType) String() string {
	return string(*ct)
}

func validateConnectorType(ct ConnectorType) bool {
	switch ct {
	case SinkOdps, SinkOss, SinkEs, SinkAds, SinkMysql, SinkFc, SinkOts, SinkDatahub, SinkHologres:
		return true
	default:
		return false
	}
}

type ConnectorState string

const (
	ConnectorStopped ConnectorState = "CONNECTOR_STOPPED"
	ConnectorRunning ConnectorState = "CONNECTOR_RUNNING"
)

func validateConnectorState(ct ConnectorState) bool {
	switch ct {
	case ConnectorStopped, ConnectorRunning:
		return true
	default:
		return false
	}
}

type ConnectorTimestampUnit string

const (
	ConnectorMicrosecond ConnectorTimestampUnit = "MICROSECOND"
	ConnectorMillisecond ConnectorTimestampUnit = "MILLISECOND"
	ConnectorSecond      ConnectorTimestampUnit = "SECOND"
)

type ConnectorConfig struct {
	TimestampUnit ConnectorTimestampUnit `json:"TimestampUnit"`
}

type PartitionMode string

const (
	UserDefineMode PartitionMode = "USER_DEFINE"
	SystemTimeMode PartitionMode = "SYSTEM_TIME"
	EventTimeMode  PartitionMode = "EVENT_TIME"
)

func (pm *PartitionMode) String() string {
	return string(*pm)
}

func NewPartitionConfig() *PartitionConfig {
	pc := &PartitionConfig{
		ConfigMap: make([]map[string]string, 0),
	}
	return pc
}

type PartitionConfig struct {
	ConfigMap []map[string]string
}

func (pc *PartitionConfig) AddConfig(key, value string) {
	m := map[string]string{
		key: value,
	}
	pc.ConfigMap = append(pc.ConfigMap, m)
}

func (pc *PartitionConfig) MarshalJSON() ([]byte, error) {
	if pc == nil || len(pc.ConfigMap) == 0 {
		return nil, nil
	}
	buf := &bytes.Buffer{}
	buf.Write([]byte{'{'})

	length := len(pc.ConfigMap)
	for i, m := range pc.ConfigMap {
		for k, v := range m {
			if _, err := fmt.Fprintf(buf, "\"%s\":\"%s\"", k, v); err != nil {
				return nil, fmt.Errorf("partition config is invalid")
			}
		}
		if i < length-1 {
			buf.WriteByte(',')
		}
	}
	buf.WriteByte('}')

	return buf.Bytes(), nil
}

func (pc *PartitionConfig) UnmarshalJSON(data []byte) error {
	//the data is "xxxxxx",should convert to xxxx, remove the ""
	var str *string = new(string)
	if err := json.Unmarshal(data, str); err != nil {
		return err
	}

	confParser := make([]map[string]string, 0)
	if err := json.Unmarshal([]byte(*str), &confParser); err != nil {
		return err
	}
	confMap := make([]map[string]string, len(confParser))

	//convert {"key":"ds","value":"%Y%m%d",...} to {"ds":"%Y%m%d",...}
	for i, m := range confParser {
		confMap[i] = map[string]string{
			m["key"]: m["value"],
		}
	}
	pc.ConfigMap = confMap
	return nil
}

/** ODPS CONFIG **/
type SinkOdpsConfig struct {
	ConnectorConfig
	Endpoint        string          `json:"OdpsEndpoint"`
	Project         string          `json:"Project"`
	Table           string          `json:"Table"`
	AccessId        string          `json:"AccessId"`
	AccessKey       string          `json:"AccessKey"`
	TimeRange       int             `json:"TimeRange"`
	TimeZone        string          `json:"TimeZone,omitempty"`
	PartitionMode   PartitionMode   `json:"PartitionMode"`
	PartitionConfig PartitionConfig `json:"PartitionConfig"`
	TunnelEndpoint  string          `json:"TunnelEndpoint,omitempty"`
	SplitKey        string          `json:"SplitKey,omitempty"`
	Base64Encode    bool            `json:"Base64Encode,omitempty"`
}

func marshalCreateOdpsConnector(ccr *CreateConnectorRequest) ([]byte, error) {
	soConf, ok := ccr.Config.(SinkOdpsConfig)
	if !ok {
		return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkOdpsConfig", reflect.TypeOf(ccr.Config)))
	}

	// set default value
	if soConf.TimestampUnit == "" {
		soConf.TimestampUnit = ConnectorMicrosecond
	}

	ct := &struct {
		Action        string            `json:"Action"`
		Type          string            `json:"Type"`
		SinkStartTime int64             `json:"SinkStartTime"`
		ColumnFields  []string          `json:"ColumnFields"`
		ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
		Config        SinkOdpsConfig    `json:"Config"`
	}{
		Action:        ccr.Action,
		Type:          ccr.Type.String(),
		SinkStartTime: ccr.SinkStartTime,
		ColumnFields:  ccr.ColumnFields,
		ColumnNameMap: ccr.ColumnNameMap,
		Config:        soConf,
	}
	return json.Marshal(ct)
}

func unmarshalGetOdpsConnector(commonResp *CommonResponseResult, data []byte) (*GetConnectorResult, error) {
	//the api return TimeRange is string, so need to convert to int64
	type SinkOdpsConfigHelper struct {
		SinkOdpsConfig
		TimeRange string `json:"TimeRange"`
	}
	ct := &struct {
		GetConnectorResult
		Config SinkOdpsConfigHelper `json:"Config"`
	}{}

	if err := json.Unmarshal(data, ct); err != nil {
		return nil, err
	}

	conf := ct.Config.SinkOdpsConfig
	t, err := strconv.Atoi(ct.Config.TimeRange)
	if err != nil {
		return nil, err
	}
	conf.TimeRange = t

	ret := &ct.GetConnectorResult
	ret.Config = conf
	ret.CommonResponseResult = *commonResp
	return ret, nil
}

// no config update
func marshalUpdateConnector(ucr *UpdateConnectorRequest) ([]byte, error) {
	ct := &struct {
		Action        string            `json:"Action"`
		ColumnFields  []string          `json:"ColumnFields,omitempty"`
		ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
	}{
		Action:        ucr.Action,
		ColumnFields:  ucr.ColumnFields,
		ColumnNameMap: ucr.ColumnNameMap,
	}
	return json.Marshal(ct)
}

func marshalUpdateOdpsConnector(ucr *UpdateConnectorRequest) ([]byte, error) {
	soConf, ok := ucr.Config.(SinkOdpsConfig)
	if !ok {
		return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkOdpsConfig", reflect.TypeOf(ucr.Config)))
	}

	// set default value
	if soConf.TimestampUnit == "" {
		soConf.TimestampUnit = ConnectorMicrosecond
	}

	ct := &struct {
		Action        string            `json:"Action"`
		ColumnFields  []string          `json:"ColumnFields,omitempty"`
		ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
		Config        SinkOdpsConfig    `json:"Config,omitempty"`
	}{
		Action:        ucr.Action,
		ColumnFields:  ucr.ColumnFields,
		ColumnNameMap: ucr.ColumnNameMap,
		Config:        soConf,
	}
	return json.Marshal(ct)
}

/*  Oss Config */
type SinkOssConfig struct {
	ConnectorConfig
	Endpoint    string   `json:"Endpoint"`
	Bucket      string   `json:"Bucket"`
	Prefix      string   `json:"Prefix"`
	TimeFormat  string   `json:"TimeFormat"`
	TimeRange   int      `json:"TimeRange"`
	AuthMode    AuthMode `json:"AuthMode"`
	AccessId    string   `json:"AccessId,omitempty"`
	AccessKey   string   `json:"AccessKey,omitempty"`
	MaxFileSize int64    `json:"MaxFileSize,omitempty"`
}

func marshalCreateOssConnector(ccr *CreateConnectorRequest) ([]byte, error) {
	soConf, ok := ccr.Config.(SinkOssConfig)
	if !ok {
		return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkOssConfig", reflect.TypeOf(ccr.Config)))
	}

	// set default value
	if soConf.TimestampUnit == "" {
		soConf.TimestampUnit = ConnectorMicrosecond
	}

	ct := &struct {
		Action        string            `json:"Action"`
		Type          ConnectorType     `json:"Type"`
		SinkStartTime int64             `json:"SinkStartTime"`
		ColumnFields  []string          `json:"ColumnFields"`
		ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
		Config        SinkOssConfig     `json:"Config"`
	}{
		Action:        "create",
		Type:          ccr.Type,
		SinkStartTime: ccr.SinkStartTime,
		ColumnFields:  ccr.ColumnFields,
		ColumnNameMap: ccr.ColumnNameMap,
		Config:        soConf,
	}
	return json.Marshal(ct)
}

func unmarshalGetOssConnector(commonResp *CommonResponseResult, data []byte) (*GetConnectorResult, error) {
	type SinkOssConfigHelper struct {
		SinkOssConfig
		TimeRange string `json:"TimeRange"`
	}
	ct := &struct {
		GetConnectorResult
		Config SinkOssConfigHelper `json:"Config"`
	}{}

	if err := json.Unmarshal(data, ct); err != nil {
		return nil, err
	}

	soConf := ct.Config.SinkOssConfig
	t, err := strconv.Atoi(ct.Config.TimeRange)
	if err != nil {
		return nil, err
	}
	soConf.TimeRange = t

	ret := &ct.GetConnectorResult
	ret.Config = soConf
	ret.CommonResponseResult = *commonResp
	return ret, nil
}

func marshalUpdateOssConnector(ucr *UpdateConnectorRequest) ([]byte, error) {
	soConf, ok := ucr.Config.(SinkOssConfig)
	if !ok {
		return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkOssConfig", reflect.TypeOf(ucr.Config)))
	}

	// set default value
	if soConf.TimestampUnit == "" {
		soConf.TimestampUnit = ConnectorMicrosecond
	}

	ct := &struct {
		Action        string            `json:"Action"`
		ColumnFields  []string          `json:"ColumnFields,omitempty"`
		ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
		Config        SinkOssConfig     `json:"Config,omitempty"`
	}{
		Action:        "create",
		ColumnFields:  ucr.ColumnFields,
		ColumnNameMap: ucr.ColumnNameMap,
		Config:        soConf,
	}
	return json.Marshal(ct)
}

/*  mysql Config */
type SinkMysqlConfig struct {
	ConnectorConfig
	Host     string     `json:"Host"`
	Port     string     `json:"Port"`
	Database string     `json:"Database"`
	Table    string     `json:"Table"`
	User     string     `json:"User"`
	Password string     `json:"Password"`
	Ignore   InsertMode `json:"Ignore"`
}

type InsertMode string

const (
	IGNORE    InsertMode = "true"
	OVERWRITE InsertMode = "false"
)

func marshalCreateMysqlConnector(ccr *CreateConnectorRequest) ([]byte, error) {
	soConf, ok := ccr.Config.(SinkMysqlConfig)
	if !ok {
		return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkMysqlConfig", reflect.TypeOf(ccr.Config)))
	}

	// set default value
	if soConf.TimestampUnit == "" {
		soConf.TimestampUnit = ConnectorMicrosecond
	}

	// set default value
	if soConf.TimestampUnit == "" {
		soConf.TimestampUnit = ConnectorMicrosecond
	}

	ct := &struct {
		Action        string            `json:"Action"`
		Type          ConnectorType     `json:"Type"`
		SinkStartTime int64             `json:"SinkStartTime"`
		ColumnFields  []string          `json:"ColumnFields"`
		ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
		Config        SinkMysqlConfig   `json:"Config"`
	}{
		Action:        "create",
		Type:          ccr.Type,
		SinkStartTime: ccr.SinkStartTime,
		ColumnFields:  ccr.ColumnFields,
		ColumnNameMap: ccr.ColumnNameMap,
		Config:        soConf,
	}
	return json.Marshal(ct)
}

func unmarshalGetMysqlConnector(commonResp *CommonResponseResult, data []byte) (*GetConnectorResult, error) {
	ct := &struct {
		GetConnectorResult
		Config SinkMysqlConfig `json:"Config"`
	}{}

	if err := json.Unmarshal(data, ct); err != nil {
		return nil, err
	}

	ret := &ct.GetConnectorResult
	ret.Config = ct.Config
	ret.CommonResponseResult = *commonResp
	return ret, nil
}

func marshalUpdateMysqlConnector(ucr *UpdateConnectorRequest) ([]byte, error) {
	soConf, ok := ucr.Config.(SinkMysqlConfig)
	if !ok {
		return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkMysqlConfig", reflect.TypeOf(ucr.Config)))
	}

	// set default value
	if soConf.TimestampUnit == "" {
		soConf.TimestampUnit = ConnectorMicrosecond
	}

	ct := &struct {
		Action        string            `json:"Action"`
		ColumnFields  []string          `json:"ColumnFields,omitempty"`
		ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
		Config        SinkMysqlConfig   `json:"Config,omitempty"`
	}{
		Action:        "create",
		ColumnFields:  ucr.ColumnFields,
		ColumnNameMap: ucr.ColumnNameMap,
		Config:        soConf,
	}
	return json.Marshal(ct)
}

/*  Ads Config */
type SinkAdsConfig struct {
	SinkMysqlConfig
}

func marshalCreateAdsConnector(ccr *CreateConnectorRequest) ([]byte, error) {
	soConf, ok := ccr.Config.(SinkAdsConfig)
	if !ok {
		return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkAdsConfig", reflect.TypeOf(ccr.Config)))
	}

	// set default value
	if soConf.TimestampUnit == "" {
		soConf.TimestampUnit = ConnectorMicrosecond
	}

	ct := &struct {
		Action        string            `json:"Action"`
		Type          ConnectorType     `json:"Type"`
		SinkStartTime int64             `json:"SinkStartTime"`
		ColumnFields  []string          `json:"ColumnFields"`
		ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
		Config        SinkAdsConfig     `json:"Config"`
	}{
		Action:        "create",
		Type:          ccr.Type,
		SinkStartTime: ccr.SinkStartTime,
		ColumnFields:  ccr.ColumnFields,
		ColumnNameMap: ccr.ColumnNameMap,
		Config:        soConf,
	}
	return json.Marshal(ct)
}

func unmarshalGetAdsConnector(commonResp *CommonResponseResult, data []byte) (*GetConnectorResult, error) {
	ct := &struct {
		GetConnectorResult
		Config SinkMysqlConfig `json:"Config"`
	}{}

	if err := json.Unmarshal(data, ct); err != nil {
		return nil, err
	}

	ret := &ct.GetConnectorResult
	ret.Config = ct.Config
	ret.CommonResponseResult = *commonResp
	return ret, nil
}

func marshalUpdateAdsConnector(ucr *UpdateConnectorRequest) ([]byte, error) {
	soConf, ok := ucr.Config.(SinkAdsConfig)
	if !ok {
		return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkAdsConfig", reflect.TypeOf(ucr.Config)))
	}

	// set default value
	if soConf.TimestampUnit == "" {
		soConf.TimestampUnit = ConnectorMicrosecond
	}

	ct := &struct {
		Action        string            `json:"Action"`
		ColumnFields  []string          `json:"ColumnFields,omitempty"`
		ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
		Config        SinkAdsConfig     `json:"Config,omitempty"`
	}{
		Action:        "create",
		ColumnFields:  ucr.ColumnFields,
		ColumnNameMap: ucr.ColumnNameMap,
		Config:        soConf,
	}
	return json.Marshal(ct)
}

/*  datahub Config */
type SinkDatahubConfig struct {
	ConnectorConfig
	Endpoint  string   `json:"Endpoint"`
	Project   string   `json:"Project"`
	Topic     string   `json:"Topic"`
	AuthMode  AuthMode `json:"AuthMode"`
	AccessId  string   `json:"AccessId,omitempty"`
	AccessKey string   `json:"AccessKey,omitempty"`
}

func marshalCreateDatahubConnector(ccr *CreateConnectorRequest) ([]byte, error) {
	soConf, ok := ccr.Config.(SinkDatahubConfig)
	if !ok {
		return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkDatahubConfig", reflect.TypeOf(ccr.Config)))
	}

	// set default value
	if soConf.TimestampUnit == "" {
		soConf.TimestampUnit = ConnectorMicrosecond
	}

	ct := &struct {
		Action        string            `json:"Action"`
		Type          ConnectorType     `json:"Type"`
		SinkStartTime int64             `json:"SinkStartTime"`
		ColumnFields  []string          `json:"ColumnFields"`
		ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
		Config        SinkDatahubConfig `json:"Config"`
	}{
		Action:        "create",
		Type:          ccr.Type,
		SinkStartTime: ccr.SinkStartTime,
		ColumnFields:  ccr.ColumnFields,
		ColumnNameMap: ccr.ColumnNameMap,
		Config:        soConf,
	}
	return json.Marshal(ct)
}

func unmarshalGetDatahubConnector(commonResp *CommonResponseResult, data []byte) (*GetConnectorResult, error) {
	ct := &struct {
		GetConnectorResult
		Config SinkDatahubConfig `json:"Config"`
	}{}

	if err := json.Unmarshal(data, ct); err != nil {
		return nil, err
	}

	ret := &ct.GetConnectorResult
	ret.Config = ct.Config
	ret.CommonResponseResult = *commonResp
	return ret, nil
}

func marshalUpdateDatahubConnector(ucr *UpdateConnectorRequest) ([]byte, error) {
	soConf, ok := ucr.Config.(SinkDatahubConfig)
	if !ok {
		return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkDatahubConfig", reflect.TypeOf(ucr.Config)))
	}

	// set default value
	if soConf.TimestampUnit == "" {
		soConf.TimestampUnit = ConnectorMicrosecond
	}

	ct := &struct {
		Action        string            `json:"Action"`
		ColumnFields  []string          `json:"ColumnFields,omitempty"`
		ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
		Config        SinkDatahubConfig `json:"Config,omitempty"`
	}{
		Action:        "create",
		ColumnFields:  ucr.ColumnFields,
		ColumnNameMap: ucr.ColumnNameMap,
		Config:        soConf,
	}
	return json.Marshal(ct)
}

/*  ES Config */
type SinkEsConfig struct {
	ConnectorConfig
	Index        string   `json:"Index"`
	Endpoint     string   `json:"Endpoint"`
	User         string   `json:"User"`
	Password     string   `json:"Password"`
	IDFields     []string `json:"IDFields"`
	TypeFields   []string `json:"TypeFields"`
	RouterFields []string `json:"RouterFields"`
	ProxyMode    bool     `json:"ProxyMode"`
}

func marshalCreateEsConnector(ccr *CreateConnectorRequest) ([]byte, error) {
	soConf, ok := ccr.Config.(SinkEsConfig)
	if !ok {
		return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkEsConfig", reflect.TypeOf(ccr.Config)))
	}

	// set default value
	if soConf.TimestampUnit == "" {
		soConf.TimestampUnit = ConnectorMicrosecond
	}

	// server need ProxyMode be string
	type SinkEsConfigHelper struct {
		SinkEsConfig
		ProxyMode string `json:"ProxyMode"`
	}
	confHelper := SinkEsConfigHelper{
		SinkEsConfig: soConf,
		ProxyMode:    strconv.FormatBool(soConf.ProxyMode),
	}

	ct := &struct {
		Action        string             `json:"Action"`
		Type          ConnectorType      `json:"Type"`
		SinkStartTime int64              `json:"SinkStartTime"`
		ColumnFields  []string           `json:"ColumnFields"`
		ColumnNameMap map[string]string  `json:"ColumnNameMap,omitempty"`
		Config        SinkEsConfigHelper `json:"Config"`
	}{
		Action:        "create",
		Type:          ccr.Type,
		SinkStartTime: ccr.SinkStartTime,
		ColumnFields:  ccr.ColumnFields,
		ColumnNameMap: ccr.ColumnNameMap,
		Config:        confHelper,
	}
	return json.Marshal(ct)
}

func unmarshalGetEsConnector(commonResp *CommonResponseResult, data []byte) (*GetConnectorResult, error) {
	type SinkEsConfigHelper struct {
		SinkEsConfig
		IDFields     string `json:"IDFields"`
		TypeFields   string `json:"TypeFields"`
		RouterFields string `json:"RouterFields"`
		ProxyMode    string `json:"ProxyMode"`
	}

	ct := &struct {
		GetConnectorResult
		Config SinkEsConfigHelper `json:"Config"`
	}{}

	if err := json.Unmarshal(data, ct); err != nil {
		return nil, err
	}

	conf := ct.Config.SinkEsConfig
	if ct.Config.ProxyMode != "" {
		proxy, err := strconv.ParseBool(ct.Config.ProxyMode)
		if err != nil {
			return nil, err
		}
		conf.ProxyMode = proxy
	}

	idFields := make([]string, 0)
	if ct.Config.IDFields != "" {
		if err := json.Unmarshal([]byte(ct.Config.IDFields), &idFields); err != nil {
			return nil, err
		}
	}
	conf.IDFields = idFields

	typeFields := make([]string, 0)
	if ct.Config.TypeFields != "" {
		if err := json.Unmarshal([]byte(ct.Config.TypeFields), &typeFields); err != nil {
			return nil, err
		}
		conf.TypeFields = typeFields
	}
	conf.TypeFields = typeFields

	routerFields := make([]string, 0)
	if ct.Config.RouterFields != "" {
		if err := json.Unmarshal([]byte(ct.Config.RouterFields), &routerFields); err != nil {
			return nil, err
		}
	}
	conf.RouterFields = routerFields

	ret := &ct.GetConnectorResult
	ret.CommonResponseResult = *commonResp
	ret.Config = conf
	return ret, nil
}

func marshalUpdateEsConnector(ucr *UpdateConnectorRequest) ([]byte, error) {
	soConf, ok := ucr.Config.(SinkEsConfig)
	if !ok {
		return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkEsConfig", reflect.TypeOf(ucr.Config)))
	}

	// set default value
	if soConf.TimestampUnit == "" {
		soConf.TimestampUnit = ConnectorMicrosecond
	}

	ct := &struct {
		Action        string            `json:"Action"`
		ColumnFields  []string          `json:"ColumnFields,omitempty"`
		ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
		Config        SinkEsConfig      `json:"Config,omitempty"`
	}{
		Action:        "create",
		ColumnFields:  ucr.ColumnFields,
		ColumnNameMap: ucr.ColumnNameMap,
		Config:        soConf,
	}
	return json.Marshal(ct)
}

type FcInvokeType string

const (
	FcSync  FcInvokeType = "sync"
	FcAsync FcInvokeType = "async"
)

/*  FC Config */
type SinkFcConfig struct {
	ConnectorConfig
	Endpoint   string       `json:"Endpoint"`
	Service    string       `json:"Service"`
	Function   string       `json:"Function"`
	AuthMode   AuthMode     `json:"AuthMode"`
	AccessId   string       `json:"AccessId,omitempty"`
	AccessKey  string       `json:"AccessKey,omitempty"`
	InvokeType FcInvokeType `json:"InvokeType"`
}

func marshalCreateFcConnector(ccr *CreateConnectorRequest) ([]byte, error) {
	soConf, ok := ccr.Config.(SinkFcConfig)
	if !ok {
		return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkFcConfig", reflect.TypeOf(ccr.Config)))
	}

	// set default value
	if soConf.TimestampUnit == "" {
		soConf.TimestampUnit = ConnectorMicrosecond
	}
	if soConf.InvokeType == "" {
		soConf.InvokeType = FcSync
	}

	ct := &struct {
		Action        string            `json:"Action"`
		Type          ConnectorType     `json:"Type"`
		SinkStartTime int64             `json:"SinkStartTime"`
		ColumnFields  []string          `json:"ColumnFields"`
		ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
		Config        SinkFcConfig      `json:"Config"`
	}{
		Action:        "create",
		Type:          ccr.Type,
		SinkStartTime: ccr.SinkStartTime,
		ColumnFields:  ccr.ColumnFields,
		ColumnNameMap: ccr.ColumnNameMap,
		Config:        soConf,
	}
	return json.Marshal(ct)
}

func unmarshalGetFcConnector(commonResp *CommonResponseResult, data []byte) (*GetConnectorResult, error) {
	ct := &struct {
		GetConnectorResult
		Config SinkFcConfig `json:"Config"`
	}{}

	if err := json.Unmarshal(data, ct); err != nil {
		return nil, err
	}

	ret := &ct.GetConnectorResult
	ret.Config = ct.Config
	ret.CommonResponseResult = *commonResp
	return ret, nil
}

func marshalUpdateFcConnector(ucr *UpdateConnectorRequest) ([]byte, error) {
	soConf, ok := ucr.Config.(SinkFcConfig)
	if !ok {
		return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkFcConfig", reflect.TypeOf(ucr.Config)))
	}

	// set default value
	if soConf.TimestampUnit == "" {
		soConf.TimestampUnit = ConnectorMicrosecond
	}

	ct := &struct {
		Action        string            `json:"Action"`
		ColumnFields  []string          `json:"ColumnFields,omitempty"`
		ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
		Config        SinkFcConfig      `json:"Config,omitempty"`
	}{
		Action:        "create",
		ColumnFields:  ucr.ColumnFields,
		ColumnNameMap: ucr.ColumnNameMap,
		Config:        soConf,
	}
	return json.Marshal(ct)
}

type OtsWriteMode string

const (
	OtsPut    OtsWriteMode = "PUT"
	OtsUpdate OtsWriteMode = "UPDATE"
)

/*  Ots Config */
type SinkOtsConfig struct {
	ConnectorConfig
	Endpoint     string       `json:"Endpoint"`
	InstanceName string       `json:"InstanceName"`
	TableName    string       `json:"TableName"`
	AuthMode     AuthMode     `json:"AuthMode"`
	AccessId     string       `json:"AccessId,omitempty"`
	AccessKey    string       `json:"AccessKey,omitempty"`
	WriteMode    OtsWriteMode `json:"WriteMode"`
}

func marshalCreateOtsConnector(ccr *CreateConnectorRequest) ([]byte, error) {
	soConf, ok := ccr.Config.(SinkOtsConfig)
	if !ok {
		return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkOtsConfig", reflect.TypeOf(ccr.Config)))
	}

	// set default value
	if soConf.TimestampUnit == "" {
		soConf.TimestampUnit = ConnectorMicrosecond
	}
	if soConf.WriteMode == "" {
		soConf.WriteMode = OtsPut
	}

	ct := &struct {
		Action        string            `json:"Action"`
		Type          ConnectorType     `json:"Type"`
		SinkStartTime int64             `json:"SinkStartTime"`
		ColumnFields  []string          `json:"ColumnFields"`
		ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
		Config        SinkOtsConfig     `json:"Config"`
	}{
		Action:        "create",
		Type:          ccr.Type,
		SinkStartTime: ccr.SinkStartTime,
		ColumnFields:  ccr.ColumnFields,
		ColumnNameMap: ccr.ColumnNameMap,
		Config:        soConf,
	}
	return json.Marshal(ct)
}

func unmarshalGetOtsConnector(commonResp *CommonResponseResult, data []byte) (*GetConnectorResult, error) {
	ct := &struct {
		GetConnectorResult
		Config SinkOtsConfig `json:"Config"`
	}{}

	if err := json.Unmarshal(data, ct); err != nil {
		return nil, err
	}

	ret := &ct.GetConnectorResult
	ret.Config = ct.Config
	ret.CommonResponseResult = *commonResp
	return ret, nil
}

func marshalUpdateOtsConnector(ucr *UpdateConnectorRequest) ([]byte, error) {
	soConf, ok := ucr.Config.(SinkOtsConfig)
	if !ok {
		return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkMysqlConfig", reflect.TypeOf(ucr.Config)))
	}

	// set default value
	if soConf.TimestampUnit == "" {
		soConf.TimestampUnit = ConnectorMicrosecond
	}

	ct := &struct {
		Action        string            `json:"Action"`
		ColumnFields  []string          `json:"ColumnFields,omitempty"`
		ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
		Config        SinkOtsConfig     `json:"Config,omitempty"`
	}{
		Action:        "create",
		ColumnFields:  ucr.ColumnFields,
		ColumnNameMap: ucr.ColumnNameMap,
		Config:        soConf,
	}
	return json.Marshal(ct)
}

/*  datahub Config */
type SinkHologresConfig struct {
	SinkDatahubConfig
	InstanceId string `json:"InstanceId,omitempty"`
}

func marshalCreateHologresConnector(ccr *CreateConnectorRequest) ([]byte, error) {
	soConf, ok := ccr.Config.(SinkHologresConfig)
	if !ok {
		return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkHologresConfig", reflect.TypeOf(ccr.Config)))
	}

	// set default value
	if soConf.TimestampUnit == "" {
		soConf.TimestampUnit = ConnectorMicrosecond
	}

	ct := &struct {
		Action        string             `json:"Action"`
		Type          ConnectorType      `json:"Type"`
		SinkStartTime int64              `json:"SinkStartTime"`
		ColumnFields  []string           `json:"ColumnFields"`
		ColumnNameMap map[string]string  `json:"ColumnNameMap,omitempty"`
		Config        SinkHologresConfig `json:"Config"`
	}{
		Action:        "create",
		Type:          ccr.Type,
		SinkStartTime: ccr.SinkStartTime,
		ColumnFields:  ccr.ColumnFields,
		ColumnNameMap: ccr.ColumnNameMap,
		Config:        soConf,
	}
	return json.Marshal(ct)
}

func unmarshalGetHologresConnector(commonResp *CommonResponseResult, data []byte) (*GetConnectorResult, error) {
	ct := &struct {
		GetConnectorResult
		Config SinkHologresConfig `json:"Config"`
	}{}

	if err := json.Unmarshal(data, ct); err != nil {
		return nil, err
	}

	ret := &ct.GetConnectorResult
	ret.Config = ct.Config
	ret.CommonResponseResult = *commonResp
	return ret, nil
}

func marshalUpdateHologresConnector(ucr *UpdateConnectorRequest) ([]byte, error) {
	soConf, ok := ucr.Config.(SinkHologresConfig)
	if !ok {
		return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkHologresConfig", reflect.TypeOf(ucr.Config)))
	}

	// set default value
	if soConf.TimestampUnit == "" {
		soConf.TimestampUnit = ConnectorMicrosecond
	}

	ct := &struct {
		Action        string             `json:"Action"`
		ColumnFields  []string           `json:"ColumnFields,omitempty"`
		ColumnNameMap map[string]string  `json:"ColumnNameMap,omitempty"`
		Config        SinkHologresConfig `json:"Config,omitempty"`
	}{
		Action:        "create",
		ColumnFields:  ucr.ColumnFields,
		ColumnNameMap: ucr.ColumnNameMap,
		Config:        soConf,
	}
	return json.Marshal(ct)
}

type ConnectorOffset struct {
	Timestamp int64 `json:"Timestamp"`
	Sequence  int64 `json:"Sequence"`
}

type ConnectorShardState string

// Deprecated, will be removed in a future version
const (
	Created   ConnectorShardState = "CONTEXT_PLANNED"
	Eexcuting ConnectorShardState = "CONTEXT_EXECUTING"
	Stopped   ConnectorShardState = "CONTEXT_PAUSED"
	Finished  ConnectorShardState = "CONTEXT_FINISHED"
)

const (
	ConnectorShardHang      ConnectorShardState = "CONTEXT_HANG"
	ConnectorShardPlanned   ConnectorShardState = "CONTEXT_PLANNED"
	ConnectorShardExecuting ConnectorShardState = "CONTEXT_EXECUTING"
	ConnectorShardStopped   ConnectorShardState = "CONTEXT_STOPPED"
	ConnectorShardFinished  ConnectorShardState = "CONTEXT_FINISHED"
)

type ConnectorShardStatusEntry struct {
	StartSequence    int64               `json:"StartSequence"`
	EndSequence      int64               `json:"EndSequence"`
	CurrentSequence  int64               `json:"CurrentSequence"`
	CurrentTimestamp int64               `json:"CurrentTimestamp"`
	UpdateTime       int64               `json:"UpdateTime"`
	State            ConnectorShardState `json:"State"`
	LastErrorMessage string              `json:"LastErrorMessage"`
	DiscardCount     int64               `json:"DiscardCount"`
	DoneTime         int64               `json:"DoneTime"`
	WorkerAddress    string              `json:"WorkerAddress"`
}
